package com.atguigu.day08;

import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;

public class Flink01_OperatorState_Broadcast {
    public static void main(String[] args) throws Exception {
        //1.获取流的执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        //2.分别获取两条流
        DataStreamSource<String> localStream = env.socketTextStream("localhost", 9999);

        DataStreamSource<String> hadoopStream = env.socketTextStream("hadoop102", 9999);

        //3.定义一个状态
        MapStateDescriptor<String, String> mapStateDescriptor = new MapStateDescriptor<>("map-State", String.class, String.class);

        //4.将状态广播出去，并返回一个广播流
        BroadcastStream<String> broadcastStream = localStream.broadcast(mapStateDescriptor);

        //5.连接普通的流和广播流
        BroadcastConnectedStream<String, String> connect = hadoopStream.connect(broadcastStream);


        //6.使用Process处理连接后的数据
        SingleOutputStreamOperator<String> result = connect.process(new BroadcastProcessFunction<String, String, String>() {
            @Override
            public void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
                //获取广播状态
                ReadOnlyBroadcastState<String, String> broadcastState = ctx.getBroadcastState(mapStateDescriptor);

                //获取广播状态中的数据，并根据数据来控制逻辑
                String aSwitch = broadcastState.get("switch");

                if ("1".equals(aSwitch)) {
                    out.collect("执行加法逻辑。。。");
                } else if ("2".equals(aSwitch)) {
                    out.collect("执行乘法逻辑...");
                } else {
                    out.collect("执行其他逻辑...");
                }

            }

            @Override
            public void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception {
                //获取广播状态
                BroadcastState<String, String> broadcastState = ctx.getBroadcastState(mapStateDescriptor);

                //将localhost中读到的数据放入广播状态中
                broadcastState.put("switch", value);
            }
        });

        result.print();

        env.execute();

    }
}
